// Copyright (c) 2022 by Duguang.IO Inc. All Rights Reserved.
// Author: Ethan Liu
// Date: 2022-05-04 14:19:25

package engine

import (
	"archive/tar"
	"bytes"
	"context"
	"io"
	"io/ioutil"

	"jianmu-worker-docker/docker/errors"
	"jianmu-worker-docker/docker/image"
	"jianmu-worker-docker/docker/jsonmessage"
	logger "jianmu-worker-docker/logging"
	"jianmu-worker-docker/registry"

	"github.com/docker/docker/api/types"
	"github.com/docker/docker/api/types/container"
	"github.com/docker/docker/api/types/filters"
	"github.com/docker/docker/api/types/network"
	"github.com/docker/docker/api/types/volume"
	"github.com/docker/docker/client"
	"github.com/docker/docker/pkg/stdcopy"
)

// Opts Docker引擎配置
type Opts struct {
	HidePull bool
}

// Docker Docker任务引擎实现
type Docker struct {
	client   client.APIClient
	hidePull bool
}

// New 创建引擎
func New(client client.APIClient, opts Opts) *Docker {
	return &Docker{
		client:   client,
		hidePull: opts.HidePull,
	}
}

// NewEnv 根据环境变量创建引擎
func NewEnv(opts Opts) (*Docker, error) {
	cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
	if err != nil {
		return nil, err
	}
	return New(cli, opts), nil
}

// Ping ping Docker守护进程
func (e *Docker) Ping(ctx context.Context) error {
	_, err := e.client.Ping(ctx)
	return err
}

func (e *Docker) FindVolumes(ctx context.Context) ([]string, error) {
	fs := filters.NewArgs()
	fs.Add("label", "creator=jianmu")
	volumes, err := e.client.VolumeList(ctx, fs)
	if err != nil {
		return nil, err
	}
	var slice []string
	for _, vol := range volumes.Volumes {
		slice = append(slice, vol.Name)
	}
	return slice, err
}

func (e *Docker) FindContainers(ctx context.Context) ([]string, error) {
	fs := filters.NewArgs()
	fs.Add("label", "creator=jianmu")
	containers, err := e.client.ContainerList(ctx, types.ContainerListOptions{
		Size:    true,
		All:     true,
		Filters: fs,
	})
	if err != nil {
		return nil, err
	}
	var slice []string
	for _, c := range containers {
		slice = append(slice, c.Names[0][1:])
	}
	return slice, nil
}

// CreateVolume 创建Volume
func (e *Docker) CreateVolume(ctx context.Context, name string) error {
	labels := map[string]string{"creator": "jianmu"}
	_, err := e.client.VolumeCreate(ctx, volume.VolumeCreateBody{
		Name:   name,
		Driver: "local",
		Labels: labels,
	})
	if err != nil {
		logger.FromContext(ctx).
			WithField("Volume name", name).
			Info("cannot create")
		return errors.TrimExtraInfo(err)
	}
	return nil
}

// DeleteVolume 删除Volume
func (e *Docker) DeleteVolume(ctx context.Context, name string) error {
	err := e.client.VolumeRemove(ctx, name, true)
	if err != nil {
		logger.FromContext(ctx).
			WithField("Volume name", name).
			Info("cannot remove")
		return errors.TrimExtraInfo(err)
	}
	return nil
}

func (e *Docker) DeleteImage(ctx context.Context, name string) {
	removeOpts := types.ImageRemoveOptions{
		Force:         true,
		PruneChildren: false,
	}
	e.client.ImageRemove(ctx, name, removeOpts)
}

// Run 运行任务容器
func (e *Docker) Run(ctx context.Context, runner *Runner, output io.Writer) (*State, error) {
	log := logger.FromContext(ctx).
		WithField("runner.id", runner.ID)
	// 创建容器
	log.Debugln("创建容器")
	err := e.create(ctx, runner, output)
	if err != nil {
		log.WithError(err).Errorln("容器创建失败")
		return nil, errors.TrimExtraInfo(err)
	}
	var bt bytes.Buffer
	bt.WriteString("使用容器: ")
	bt.WriteString(runner.ID)
	bt.WriteString(" 执行任务")
	bt.WriteString("\n")
	output.Write(bt.Bytes())

	// 启动容器
	log.Debugln("启动容器")
	err = e.start(ctx, runner.ID)
	if err != nil {
		log.WithError(err).Errorln("启动容器失败")
		return nil, errors.TrimExtraInfo(err)
	}
	// 获取容器输出
	log.Debugln("获取容器输出")
	err = e.tail(ctx, runner.ID, output)
	if err != nil {
		return nil, errors.TrimExtraInfo(err)
	}
	// 等待容器结果
	return e.waitRetry(ctx, runner)
}

func (e *Docker) Resume(ctx context.Context, runner *Runner, output io.Writer) (*State, error) {
	log := logger.FromContext(ctx).
		WithField("runner.id", runner.ID)
	// 获取容器输出
	log.Debugln("获取容器输出")
	err := e.tail(ctx, runner.ID, output)
	if err != nil {
		return nil, errors.TrimExtraInfo(err)
	}
	// 等待容器结果
	return e.waitRetry(ctx, runner)
}

func (e *Docker) Clean(ctx context.Context, runner *Runner) error {
	removeOpts := types.ContainerRemoveOptions{
		Force:         true,
		RemoveLinks:   false,
		RemoveVolumes: true,
	}
	err := e.client.ContainerRemove(ctx, runner.ID, removeOpts)
	if err != nil {
		return errors.TrimExtraInfo(err)
	}
	return nil
}

func (e *Docker) Attach(ctx context.Context, id string) (*types.HijackedResponse, error) {
	// 执行/bin/bash命令
	idr, err := e.client.ContainerExecCreate(ctx, id, types.ExecConfig{
		AttachStdin:  true,
		AttachStdout: true,
		AttachStderr: true,
		Cmd:          []string{"/bin/bash"},
		Tty:          true,
	})
	if err != nil {
		return nil, err
	}
	hr, err := e.client.ContainerExecAttach(ctx, idr.ID, types.ExecStartCheck{Detach: false, Tty: true})
	if err != nil {
		return nil, err
	}
	return &hr, nil
}

// 拉取镜像并创建容器
func (e *Docker) create(ctx context.Context, runner *Runner, output io.Writer) error {
	// 创建拉取参数，如果存在认证信息则编码
	pullopts := types.ImagePullOptions{}
	if runner.Auth != nil {
		pullopts.RegistryAuth = registry.Header(
			runner.Auth.Username,
			runner.Auth.Password,
		)
	}
	// 如果拉取策略为Always或镜像未设置标签则拉取最新镜像
	if runner.Pull == PullAlways ||
		(runner.Pull == PullDefault && image.IsLatest(runner.Spec.Image)) {
		rc, pullerr := e.client.ImagePull(ctx, runner.Spec.Image, pullopts)
		if pullerr == nil {
			if e.hidePull {
				io.Copy(io.Discard, rc)
			} else {
				jsonmessage.Copy(rc, output)
			}
			rc.Close()
		}
		if pullerr != nil {
			return pullerr
		}
	}

	// 创建容器
	_, err := e.client.ContainerCreate(ctx,
		toConfig(&runner.Spec, e.client.DaemonHost()),
		toHostConfig(&runner.Spec, e.client.DaemonHost(), runner.Spec.NetworkMode),
		nil,
		runner.ID,
	)

	// 如果镜像不存在则自动拉取镜像并重新创建容器
	if client.IsErrNotFound(err) && runner.Pull != PullNever {
		rc, pullerr := e.client.ImagePull(ctx, runner.Spec.Image, pullopts)
		if pullerr != nil {
			return pullerr
		}

		if e.hidePull {
			io.Copy(ioutil.Discard, rc)
		} else {
			jsonmessage.Copy(rc, output)
		}
		rc.Close()

		// 重新创建容器
		_, err = e.client.ContainerCreate(ctx,
			toConfig(&runner.Spec, e.client.DaemonHost()),
			toHostConfig(&runner.Spec, e.client.DaemonHost(), runner.Spec.NetworkMode),
			nil,
			runner.ID,
		)
	}
	if err != nil {
		return err
	}

	// 指定容器使用用户定义的网络
	if runner.Spec.Network == "" {
		for _, net := range runner.Spec.Networks {
			err = e.client.NetworkConnect(ctx, net, runner.ID, &network.EndpointSettings{
				Aliases: []string{net},
			})
			if err != nil {
				return nil
			}
		}
	}

	return nil
}

// 启动容器
func (e *Docker) start(ctx context.Context, id string) error {
	return e.client.ContainerStart(ctx, id, types.ContainerStartOptions{})
}

// 等待容器执行完成并返回退出码
func (e *Docker) wait(ctx context.Context, id string) (*State, error) {
	wait, errc := e.client.ContainerWait(ctx, id, container.WaitConditionNotRunning)
	select {
	case <-wait:
	case <-errc:
	}

	info, err := e.client.ContainerInspect(ctx, id)
	if err != nil {
		return nil, err
	}

	return &State{
		Exited:    !info.State.Running,
		ExitCode:  info.State.ExitCode,
		OOMKilled: info.State.OOMKilled,
	}, nil
}

// 等待容器执行完成并返回退出码
func (e *Docker) waitRetry(ctx context.Context, runner *Runner) (*State, error) {
	log := logger.FromContext(ctx).
		WithField("runner.id", runner.ID)
	for {
		// 当超时或被终止时，context收到取消通知，退出并返回错误
		if err := ctx.Err(); err != nil {
			return nil, err
		}
		state, err := e.wait(ctx, runner.ID)
		if err != nil {
			return nil, err
		}
		log.Debugln("容器运行完成")
		result, err := e.getResultFile(ctx, runner.ID, runner.ResultFile)
		if err != nil {
			log.WithError(err).WithField("runner.ResultFile", runner.ResultFile).Debugln("无法获取结果文件")
			return nil, err
		}
		log.Debugln("获取结果文件完成")
		state.ResultFile = result
		if state.Exited {
			return state, err
		}
		logger.FromContext(ctx).
			WithField("container", runner.ID).
			Trace("docker wait exited unexpectedly")
	}
}

// 获取结果文件
func (e *Docker) getResultFile(ctx context.Context, id string, filepath string) (string, error) {
	if filepath == "" {
		return "", nil
	}
	reader, _, err := e.client.CopyFromContainer(ctx, id, filepath)
	if err != nil {
		return "", err
	}
	logger.FromContext(ctx).
		WithField("runner.id", id).Debugln("get reader")
	tr := tar.NewReader(reader)
	logger.FromContext(ctx).
		WithField("runner.id", id).Debugln("获取结果文件压缩包")
	for {
		hdr, err := tr.Next()
		if err == io.EOF {
			break // End of archive
		}
		if err != nil {
			return "", err
		}
		if hdr.Name != "" {
			data, err := io.ReadAll(tr)
			if err != nil {
				logger.FromContext(ctx).
					WithError(err).Errorln("结果文件读取错误")
			}
			logger.FromContext(ctx).
				WithField("filename", hdr.Name).Debugln(string(data))
			return string(data), nil
		}
	}
	return "", nil
}

// 获取容器日志
func (e *Docker) tail(ctx context.Context, id string, output io.Writer) error {
	opts := types.ContainerLogsOptions{
		Follow:     true,
		ShowStdout: true,
		ShowStderr: true,
		Details:    false,
		Timestamps: false,
	}

	log, err := e.client.ContainerLogs(ctx, id, opts)
	if err != nil {
		return err
	}

	go func() {
		stdcopy.StdCopy(output, output, log)
		log.Close()
	}()
	return nil
}
